package com.atguigu.realtime.app.dwd.db;


import com.atguigu.realtime.app.BaseSqlApp;
import com.atguigu.realtime.common.Constant;
import com.atguigu.realtime.util.SQLUtil;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

import java.time.Duration;


/**
 * com.atguigu.realtime.app.dwd.db.Dwd_14_DwdInteractionFavorAdd
 * 互动域收藏商品事务事实表（练习）
 *
 *  读取收藏数据，写入 Kafka 收藏主题
 *      用户收藏商品时业务数据库的收藏表会插入一条数据，因此筛选操作类型为 insert 的数据即可。
 */
public class Dwd_14_DwdInteractionFavorAdd extends BaseSqlApp {
    public static void main(String[] args) {
        new Dwd_14_DwdInteractionFavorAdd().init(
                3014,2,
                "Dwd_14_DwdInteractionFavorAdd"
        );
    }

    @Override
    protected void handle(StreamExecutionEnvironment env,
                          StreamTableEnvironment tEnv) {
        tEnv.getConfig().setIdleStateRetention(Duration.ofSeconds(10));

        // 1. 读取ods_db
        readOdsDb(tEnv, "Dwd_11_DwdToolCouponGet");


        // TODO 2. 读取收藏表数据
        Table result = tEnv.sqlQuery("select\n" +
                "data['id'] id,\n" +
                "data['user_id'] user_id,\n" +
                "data['sku_id'] sku_id,\n" +
                "date_format(data['create_time'],'yyyy-MM-dd') date_id,\n" +
                "data['create_time'] create_time,\n" +
                "ts\n" +
                "from ods_db\n" +
                "where `database`='gmall2022' \n" +
                "and `table` = 'favor_info'\n" +
                "and `type` = 'insert'\n");



        tEnv.executeSql("create table dwd_interaction_favor_add(\n" +
                "id string,\n" +
                "user_id string,\n" +
                "sku_id string,\n" +
                "date_id string,\n" +
                "create_time string,\n" +
                "ts bigint \n" +
                ")" + SQLUtil.getKafkaSink(Constant.TOPIC_DWD_INTERACTION_FAVOR_ADD));

        result.executeInsert("dwd_interaction_favor_add");
    }
}

/*

 */